# Historical Data Export

__Notebook Version:__ 1.0<br>
__Python Version:__ Python 3.8<br>
__Required Packages:__ azure-monitor-query, azure-storage-file-datalake, azureml-synapse<br>
__Platforms Supported:__  Azure Machine Learning Notebooks connected to Azure Synapse Workspace
     
__Data Source Required:__ Yes

__Data Source:__ Azure Log Analytics

__Spark Version:__ 3.1 or above
    
## Description

Use this notebook to export of historical data in your Log Analytics workspace. 
(This notebook extends the **continuous log export tool** in Microsoft Sentinel (see [docs](https://docs.microsoft.com/azure/azure-monitor/logs/logs-data-export?tabs=portal) 
and [blog post](https://techcommunity.microsoft.com/t5/microsoft-sentinel-blog/configure-a-continuous-data-pipeline-in-microsoft-sentinel-for/ba-p/3242605)), 
with historical logs exported using the same data format and partition scheme as the continuously exported logs.) 

In this notebook, we demo the **one-time export of historical log data** from the _SigninLogs_ table, but this can easily be modified to target **any subset of log data**. The notebook only needs to be **run once**, following which exported logs can be used in Sentinel notebooks for large scale data analytics and machine learning on log data (see the Sentinel template notebooks for more details). 
Alternatively, you may wish to use this tool for archiving older logs in more cost-effective archive-tier Azure storage.

This notebook also makes use of the **Azure Synapse integration** for Sentinel notebooks to enable partitioning and writing of data files at scale. To set up the Synapse integration for Sentinel noteboks, please follow the instructions [here](https://docs.microsoft.com/azure/sentinel/notebooks-with-synapse).

## Steps

1. Fetch data from the Sentinel workspace using the `azure-monitor-query` Python package.  
   - The requested data may be the entirety of a given table, or based on a custom KQL query
   - Querying and fetching of data is automatically chunked and run asynchronously to avoid API throttling issues

2. Write data to Azure Data Lake Gen2

3. Use Apache Spark (via Azure Synapse) to repartition data to match the partition scheme created by the continuous log export tool
   - The continuous log export tool created a separate nested directory for each (year, month, day, hour, 5-minute interval) tuple in the format: **`{base_path}/y=<year>/m=<month>/d=<day>/h=<hour>/m=<5-minute-interval>`**
   - For a year's worth of historical log data, we may be writing over 100,000 separate data files, so we rely on Spark's multi-executor parallelism to do this efficiently

4. Use the `azure-storage-file-datalake` Python package to do rename a few high-level directories to match the partition scheme used by the continuous log export tool

5. Clean up any data stored in intermediate locations during the data ETL process

# Install Pre-Requisite Packages

In [None]:
import sys

# Install Azure Monitor Query client package to query Sentinel log data
!{sys.executable} -m pip install --upgrade azure-monitor-query

In [None]:
# Install Azure storage datalake library to manipulate file systems
!{sys.executable} -m pip install --upgrade azure-storage-file-datalake --pre

In [None]:
# Install AzureML Synapse package to use spark magic
!{sys.executable} -m pip install --upgrade azureml-synapse

# 0. Setup

Authenticate and connect to your Log Analytics workspace. The workspace ID is loaded from the `config.json` file which will have been created when you set up your first Sentinel notebook.

In [None]:
import asyncio
import io
import itertools
import json
from math import ceil
import os
from pathlib import PurePosixPath


import pandas as pd
from datetime import datetime, timedelta, timezone


from azure.core.exceptions import HttpResponseError
from azure.monitor.query import LogsQueryStatus, LogsBatchQuery
from azure.monitor.query.aio import LogsQueryClient
from azure.identity.aio import DefaultAzureCredential
from azure.storage.filedatalake import DataLakeServiceClient
from azureml.core import Workspace, LinkedService


# Credential used to authenticate to Log Analytics API
credential = DefaultAzureCredential()
logs_client = LogsQueryClient(credential)

with open("config.json", "r") as f:
    LOG_ANALYTICS_WORKSPACE_ID = json.load(f).get("workspace_id", "")

# Use this if a `config.json` has not been created automatically:
# LOG_ANALYTICS_WORKSPACE_ID = '<your Log Analytics / Sentinel workspace ID>'

We now define functions that we will use to query and fetch the historical log data to export/archive. We use the `azure-monitor-query` Python package to invoke the Log Analytics REST API. 
Queries are chunked by time range to avoid [throttling and truncation of returned data](https://docs.microsoft.com/azure/azure-monitor/service-limits#log-analytics-workspaces) and the chunked API calls are executed asynchronously.

**Edit the KQL query in the cell below as appropriate to specify which logs/columns to query; to specify all the available logs for a given table, just set `QUERY = <name-of-table>`.**

In [None]:
# EDIT THIS KQL AS REQUIRED!!!
QUERY = "SigninLogs"  # Add any `project` statement and/or filtering here!!!

In [None]:
async def async_execute_query(
    query: str, 
    start_time: datetime, 
    end_time: datetime, 
    *, 
    retries: int = 1, 
    query_num: int = None, 
    print_message: str = None
):
    """
    Asyncrhonously execute the given query, restricted to the given time range, and parse the API response
    """
    if query_num is None:
        query_num = ""

    for i in range(retries + 1):
        if print_message is not None:
            print(print_message)
        try:
            response = await logs_client.query_workspace(
                workspace_id=LOG_ANALYTICS_WORKSPACE_ID,
                query=query,
                timespan=(start_time, end_time),
                server_timeout=600,                 
                # include_statistics=True,          # Use for debugging!
            )
        except HttpResponseError as e:
            print(f"Fatal error when attempting query {query_num} (query time span: {start_time} to {end_time}):\n\t", e)
            print_message = f"Attempt {i + 2} of {retries + 1}. {print_message}"
            continue

        if response.status == LogsQueryStatus.SUCCESS:
            print(f"Query {query_num} successful (query time span: {start_time} to {end_time})")
            return response.tables[0]
        elif response.status == LogsQueryStatus.PARTIAL:
            # this will be a LogsQueryPartialResult
            error = response.partial_error
            print(f"Partial results returned for query {query_num} (query time span: {start_time} to {end_time}):\n\t", error.message)
            if i == retries + 1:
                return response.partial_data[0]
        elif response.status == LogsQueryStatus.FAILURE:
            # this will be a LogsQueryError
            print(f"Query {query_num} failed (query time span: {start_time} to {end_time}):\n\t", response.message)
        else:
            print(f"Unknown error in query {query_num} (query time span: {start_time} to {end_time})")

        print_message = f"Attempt {i + 2} of {retries + 1}. {print_message}"


async def batch_endpoints_by_row_count(
    end_time: datetime, 
    days_back: int, 
    max_rows_per_query: int = int(1e5),  # Maximum is supposedly 500,000, but that doesn't seem correct
    time_col: str = "TimeGenerated",
):
    """
    Determine the timestamp endpoints for each chunked query, 
    such that number of rows returned by each query is (approximately) `max_rows_rows_per_query`
    """
    find_batch_endpoints_query = f"""
        {QUERY}
        | sort by {time_col} desc
        | extend batch_num = row_cumsum(1) / {max_rows_per_query}
        | summarize endpoint=min({time_col}) by batch_num
        | sort by batch_num asc
        | project endpoint
    """
    
    start_time = end_time - timedelta(days=days_back)
    response = await logs_client.query_workspace(
        workspace_id=LOG_ANALYTICS_WORKSPACE_ID,
        query=find_batch_endpoints_query,
        timespan=(start_time, end_time)
    )

    batch_endpoints = [end_time]
    batch_endpoints += [row[0] for row in response.tables[0].rows]
    return batch_endpoints


async def auto_find_batch_endpoints(
    end_time: datetime, 
    days_back: int, 
    time_col: str = "TimeGenerated",
):
    """
    Determine the timestamp endpoints for each chunked query, 
    such that number of rows returned by each query is less that the API limit (500K)
    and the size of the data returned is less than the API limit (~100 MiB).

    Aims to achieve the above without creating an excessive number of chunks - 
    worst case performance is double the theoretical minimum number of queries necessary
    """
    max_bytes_per_query = 100 * 1024 * 1024  # 100 MiB
    max_bytes_per_query = int(0.8 * max_bytes_per_query)  # Limit is not exact (depends on data compression ratio), so leaving so wiggle-room here

    max_rows_per_query = int(5e5)  # 500K
    max_rows_per_query = int(0.9 * max_rows_per_query)  # Sometimes we will go over the limit if many events share the same timestamp

    find_batch_endpoints_by_data_limit_query = f"""
        {QUERY}
        | sort by {time_col} desc
        | extend batch_num = row_cumsum(estimate_data_size(*)) / {max_bytes_per_query}
        | summarize endpoint=min({time_col}) by batch_num
        | sort by batch_num asc
        | project endpoint
    """
    
    start_time = end_time - timedelta(days=days_back)
    response = await logs_client.query_workspace(
        workspace_id=LOG_ANALYTICS_WORKSPACE_ID,
        query=find_batch_endpoints_by_data_limit_query,
        timespan=(start_time, end_time)
    )

    batch_endpoints_by_data_limit = [end_time]
    batch_endpoints_by_data_limit += [row[0] for row in response.tables[0].rows]

    batch_endpoints_by_row_limit = await batch_endpoints_by_row_count(end_time, days_back, max_rows_per_query)

    batch_endpoints = sorted(batch_endpoints_by_data_limit + batch_endpoints_by_row_limit, reverse=True)
    return batch_endpoints


async def fetch_logs(
    end_time: datetime, 
    days_back: int, 
    *,
    auto_batch: bool = False, 
    batch_size_rows: int = None, 
    batch_size_days:float = None,
):
    """
    Batch the query into time intervals of size `batch_size_days` and perform asyncrhonous Log Analytics API calls for each batch.

    Returns a pandas dataframe containing queries data.
    """
    query = QUERY

    if auto_batch:
        print("Dynamically determining how to chunk up queries based on API row and data limits...")
        batch_endpoints = await auto_find_batch_endpoints(end_time, days_back)
        num_queries = len(batch_endpoints) - 1
    elif batch_size_rows is not None:
        print("Dynamically determining how to chunk up queries...")
        batch_endpoints = await batch_endpoints_by_row_count(end_time, days_back, batch_size_rows)
        num_queries = len(batch_endpoints) - 1
    elif batch_size_days is not None:
        num_queries = ceil(days_back / batch_size_days)
        batch_endpoints = [end_time - (i * timedelta(days=batch_size_days)) for i in range(num_queries + 1)]
    else:
        print("one of the parameters, `auto_batch`, `batch_size_rows`, `batch_size_days` must be set")
        return

    async_queries = []
    for i in range(num_queries):
        query_end_time = batch_endpoints[i]
        query_start_time = batch_endpoints[i + 1]
        print_message = f"Submitting Query {i + 1} of {num_queries}: {query_start_time} to {query_end_time}"
        async_queries.append(async_execute_query(query, query_start_time, query_end_time, print_message=print_message, query_num=i+1))
        
        # Use this instead of the above query for debugging issues with API limits - this will return the row count and data size of each chunk (instead of the data itself)
        # async_queries.append(async_execute_query(f"{query} | summarize query_num = max({i}), count(), est_data_size = sum(estimate_data_size(*))", query_start_time, query_end_time))

    results = await asyncio.gather(*async_queries)
    columns = results[0].columns
    rows = itertools.chain.from_iterable([table.rows for table in results if table is not None])
    df = pd.DataFrame(columns=columns, data=rows)
    return df

async def estimate_data_size(
    end_time: datetime, 
    days_back: int,
):
    query = f"{QUERY} | summarize n_rows = count(), estimate_data_size = sum(estimate_data_size(*))"
    start_time = end_time - timedelta(days=days_back)
    response = await logs_client.query_workspace(
        workspace_id=LOG_ANALYTICS_WORKSPACE_ID,
        query=query,
        timespan=(start_time, end_time)
    )

    columns = response.tables[0].columns
    rows = response.tables[0].rows
    df = pd.DataFrame(columns=columns, data=rows)
    return df

Here, we define the functions that will be used to interact with the ADLS storage account(s) to which we want to export our historical log data. These use the `azure-storage-file-datalake` Python package which uses the [ADLS Gen2 REST API](https://docs.microsoft.com/azure/storage/blobs/data-lake-storage-directory-file-acl-python) under the hood.

**Nothing to change here (just boilerplate) - you can just run this cell as-is!**

In [None]:
# This pattern avoids re-authenticating every time the ADLS client needs to be used
global adls_service_client
adls_service_client = None

def get_adls_client(storage_account_name, storage_account_key, force_new_client=False):
    """
    Authenticates to ADLS API and instantiates an ADLS client object.
    Subsequent calls to this function return the same client object unless `force_new_client` is set.
    """

    # Authenticate and instantiate new ADLS client the first time this functino is called
    if (not force_new_client) and (adls_service_client is not None):
        return adls_service_client
    
    try:
        return DataLakeServiceClient(
            account_url='{}://{}.dfs.core.windows.net'.format(
                'https', storage_account_name
            ),
            credential=storage_account_key,
        )
    except Exception as e:
        print(e)


def upload_df_to_adls_path(
    df: pd.DataFrame, 
    adls_dirname: str,
    adls_filename: str,
    container_name: str,
    storage_account_name: str, 
    storage_account_key: str,
):
    """
    Upload a pandas dataframe to the specified ADLS path as a single JSON lines files
    """
    json_data = df.to_json(orient='records', lines=True, date_format='iso')

    adls_service_client = get_adls_client(storage_account_name, storage_account_key)
    file_system_client = adls_service_client.get_file_system_client(file_system=container_name)

    try:
        file_system_client.create_directory(adls_dirname)
    except Exception as e:
        print(e)

    try:
        directory_client = file_system_client.get_directory_client(adls_dirname)
        file_client = directory_client.get_file_client(adls_filename)
        file_client.upload_data(json_data, overwrite=True)
    except Exception as e:
        print(e)


def delete_directory(
    adls_dirname: str,
    container_name: str,
    storage_account_name: str, 
    storage_account_key: str,
):
    """
    Delete the specified ALDS directory
    """
    try:
        adls_service_client = get_adls_client(storage_account_name, storage_account_key)
        file_system_client = adls_service_client.get_file_system_client(file_system=container_name)
        directory_client = file_system_client.get_directory_client(adls_dirname)
        directory_client.delete_directory()
    except Exception as e:
        print(e)
    

def get_month_partition_dirs(
    adls_root_search_path: str, 
    container_name: str,
    storage_account_name: str, 
    storage_account_key: str,
):
    """
    Searches for all directories under the `adls_root_search_path` that have the prefix 'month='.

    Returns a list of matching directory paths.
    """
    month_directories = []
    try:
        adls_service_client = get_adls_client(storage_account_name, storage_account_key)
        file_system_client = adls_service_client.get_file_system_client(file_system=container_name)
        paths_response = file_system_client.get_paths(path=adls_root_search_path)

    except Exception as e:
        print(e)
        return

    for path in paths_response:
        if path.is_directory and path.name.split('/')[-1].startswith('month='):
            month_directories.append(path.name)

    return month_directories


def rename_month_partition_dirs(
    adls_root_search_path: str, 
    container_name: str,
    storage_account_name: str, 
    storage_account_key: str,
):
    """
    Searches for all directories under the `adls_root_search_path` that have the prefix 'month=' and replaces the prefix with 'm='.
    (This is used to modify the partition scheme produced by PySpark to match the naming scheme used by the Sentinel continuous data export tool.)
    """
    month_directories = get_month_partition_dirs(adls_root_search_path, container_name, storage_account_name, storage_account_key)

    for month_dir in month_directories:
        path = PurePosixPath(month_dir)
        if not path.stem.startswith('month='):
            continue
        
        new_stem = f'm={path.stem[len("month="):]}'
        new_path = str(PurePosixPath(path.parent, new_stem))
        
        print(f'Renaming {path} to {new_path}')
        try:
            adls_service_client = get_adls_client(storage_account_name, storage_account_key)
            file_system_client = adls_service_client.get_file_system_client(file_system=container_name)
            directory_client = file_system_client.get_directory_client(month_dir)
            directory_client.rename_directory(new_name=directory_client.file_system_name + '/' + new_path)
        except Exception as e:
            print(e)

# 1. Fetching Log Data

You may wish to estimate costs that will be incurred on your Azure storage account before beginning the data export process. The query in the cell below will estimate the size of the data to be exported along with the number of blobs that would be written 
if you choose to partition data into 5-minute intervals (as is done by the continuous log export). 
Use this in conjunction with the Azure storage [pricing calculator](https://azure.microsoft.com/pricing/calculator/?service=storage) to determine costs that will be incurred for your storage setup. 
(Full billing details for ADLS Gen2 can be found [here](https://azure.microsoft.com/pricing/details/storage/data-lake/).)

Use the query in the next cell to inform how much data to export.

In [None]:
# Set values as appropriate
end_time = datetime.strptime("2022-04-01 00:00:00 +0000", "%Y-%m-%d %H:%M:%S %z")
days_back = 30  # How far back before the `end_date` to query logs

data_size_df = await estimate_data_size(end_time, days_back)  # type: ignore
data_size_df["estimate_data_size_MB"] = data_size_df["estimate_data_size"] / (1000 **2)
data_size_df["estimate_data_size_GB"] = data_size_df["estimate_data_size_MB"] / 1000

# Section 3 of this notebook partitions data into 5 minute buckets by default (to match the continuous data export)
data_size_df["n_blobs_if_using_5_min_partitions"] = ceil(timedelta(days=days_back) / timedelta(minutes=5))

data_size_df

This notebook fetches historical log data (via asynchronous, chunked queries to the log analytics [REST API](https://docs.microsoft.com/python/api/overview/azure/monitor-query-readme?view=azure-python)). 

Variables to set:
- `end_time` - The datetime up to which logs should be fetched. If you have already set up a continuous export pipeline, **ensure that this is set to an earlier date than the start of the continuously exported data**.
- `days_back` - How far back before the `end_date` to query logs   
- `batch_size_rows` OR `batch_size_days` - How many rows/fractional days of data to query in each API call

Log Analytics workspace and log queries in Azure Monitor are multitenancy services that include limits to protect and isolate customers and to maintain quality of service. When querying for a large amount of data, you should consider the following limits

| Limit _(as of May 2022)_                       | Remedy                                                                                                                                                                             |
|------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Maximum of 500,000 rows returned per query     | Set `batch_size_rows` to <490K (or reduce value of `batch_size_days`)                                                                                                              |
| Maximum of ~100 MiB of data returned per query | - Reduce the set of columns being exported. This can have a significant performance impact!<br>- If number of columns can't be reduced, reduce `batch_size_rows`/`batch_size_days` |
| Maximum of 200 requests per 30 seconds         | - Increase `batch_size_rows`/`batch_size_days`<br>- Reduce `days_back`<br>- Try again! (Some 'Server Disconnected' errors are ephemeral and not due to the rate limit being hit)   |

**EXPERIMENTAL:** Setting `auto_batch = True` will cause the query function to intelligently chunk queries based on rows returned and estimated data size, so as to avoid API limits.

Also consider that the fetched data will (initially) be held in memory on your Azure ML compute instance; depending on the volume of historical logs you wish to export, you may reach VM memory limits.<br>
If this is a problem, consider exporting the data in chunks - e.g. instead of exporting 365 days of data at once export 100 days of data at a time by setting the values of `end_time` and `days_back` appropriately and re-running the notebook from this cell onwards for each separate chunk.<br>
Alternatively, use a compute instance with more memory to run this notebook (such as the [Azure E-Series VMs](https://azure.microsoft.com/pricing/details/virtual-machines/series/)).


> **Note:** This cell may take a while to run!

In [None]:
# Set values as appropriate
end_time = datetime.strptime("2022-04-01 00:00:00 +0000", "%Y-%m-%d %H:%M:%S %z")
days_back = 30  # How far back before the `end_date` to query logs

# Use one of the strategies below to chunk queries OR use `auto_batch = True`
batch_size_rows = int(4e5)  # How many rows of data to return in each API call, max. 500K (see notes above)
batch_size_days = 3  # How many (fractional) days of data to query in each API call (see notes above)

df = await fetch_logs(
    end_time=end_time, 
    days_back=days_back, 
    auto_batch=True,
    # batch_size_rows=batch_size_rows,
    # batch_size_days=batch_size_days,
)  # type: ignore

print("# Rows retrieved:", len(df))
with pd.option_context("display.max_columns", None):
    df.head()

> **Before continuing**, check the output from the cell above:
> - Ensure that the dataframe in the output of the cell above contains the expected data
> - Ensure that there are no query failure or data truncation error messages in the output of the cell above

# 2. Write Data to ADLS

In the cell below, specify the details of the storage account and file to which to write the log data dataframe. This will write a single JSON file containing the historical log data.  
This is only used a temporary staging location before the data is repartitioned across a large number of smaller files using Spark.

> **Note:** Your Azure Synapse workspace will need to be able to access this ADLS location in the next step.


In [None]:
staging_account_name = '<storage account name>'  # fill in your primary account name
staging_container_name = '<container name>'  # fill in your container name
staging_dirname = "historical-log-dump" #'<name of directory to write to>'  # fill in your directory name
staging_filename = "signin_logs.json" #'<name of file to write to>'  # fill in your file name (include the .json extension)

# In production, make sure any keys are stored and retrieved securely (e.g. using Azure Key Vault) - don't store keys as plaintext!
staging_account_key = '<storage-account-key>'  # replace your storage account key

upload_df_to_adls_path(
    df,
    adls_dirname=staging_dirname,
    adls_filename=staging_filename,
    container_name=staging_container_name,
    storage_account_name=staging_account_name,
    storage_account_key=staging_account_key,
)

# 3. Repartition Data Using Spark

Now we use PySpark to repartition the exported log data by timestamp in the following format:

    {base_path}/y=<year>/m=<month>/d=<day>/h=<hour>/m=<5-minute-interval>

We also write to the same location used by the continuous log export: the `base_path` used by the continuous log export is:

    WorkspaceResourceId=/subscriptions/{subscription_id}/resourcegroups/{resource_group.lower()}/providers/microsoft.operationalinsights/workspaces/{workspace_name.lower()}

Storing the exported logs data like this has two benefits:

- This matches the partition scheme used by continuously exported logs; this means that means that continuously exported data and historical log data can be read from in a unified way by any notebooks or data pipelines that consume this data
- Partitioning by timestamps can allow for efficient querying of file data - by encoding the timestamp values in file paths, we can minimise the number of required file reads when loading data from a specific time range in downstream tasks 


### Configure Azure ML and Azure Synapse Analytics

To run this Synapse PySpark code, the Synapse integration for AML notebooks needs to be set up: please refer to the template notebook, [Configurate Azure ML and Azure Synapse Analytics](https://github.com/Azure/Azure-Sentinel-Notebooks/blob/master/Configurate%20Azure%20ML%20and%20Azure%20Synapse%20Analytics.ipynb), to configure environment. 

The notebook will configure existing Azure synapse workspace to create and connect to Spark pool. You can then create linked service and connect AML workspace to Azure Synapse workspaces.

> **Note**: Specify the input parameters in below step in order to connect AML workspace to synapse workspace using linked service.

In [None]:
aml_workspace = '<aml workspace name>'  # fill in your AML workspace name
subscription_id = '<subscription id>' # fill in your subscription id
resource_group = '<resource group of AML workspace>' # fill in your resource groups for AML workspace
linkedservice = '<linked service name>' # fill in your linked service created to connect to synapse workspace

### Start Spark Session
Enter your Synapse Spark compute below. To find the Spark compute, please follow these steps:

1. On the AML Studio left menu, navigate to **Linked Services**
2. Click on the name of the Link Service you want to use
3. Select **Spark pools** tab
4. Get the name of the Spark pool you want to use

In [None]:
from azureml.core import Workspace, LinkedService


# Get the aml workspace
aml_workspace = Workspace.get(name=aml_workspace, subscription_id=subscription_id, resource_group=resource_group)

# Retrieve a known linked service
linked_service = LinkedService.get(aml_workspace, linkedservice)

# Enter the name of the attached Spark pool
synapse_spark_compute = input('Synapse Spark compute:')

# Start Spark session
%synapse start -s $subscription_id -w $aml_workspace -r $resource_group -c $synapse_spark_compute

### Repartition Data Using PySpark

Having started the Spark session, we can run PySpark code by starting a cell with the `%%synapse` line magic.  
Doing this part of the data ETL process using Spark allows the partitioning and writing of data to be hugely parallelized - for a year's worth of log data, we may be creating over 100,000 data files (one per partition).

First we read in the historical log data from ADLS (where we exported the data to in step 2.) into the Spark context.

> Fill in the details of the sotrage account, ADLS container and directory/file to which the historical logs were exported in step 2.

In [None]:
%%synapse

# Fill in the details of the ADLS location where we have dumped the historical log data
staging_account_name = '<storage account name>'  # fill in your primary account name
staging_container_name = '<container name>'  # fill in your container name
staging_dirname = "historical-log-dump"  # Name of directory to which logs were written in step 2.
staging_filename = "signin_logs.json"  # Name of file to which logs were written in step 2.

historical_logs_adls_path = (
    f"abfss://{staging_container_name}@{staging_account_name}.dfs.core.windows.net/"
    f"{staging_dirname}/{staging_filename}"
)

df = spark.read.json(historical_logs_adls_path)

Next, we use the `TimeGenerated` column to create the year, month, day, etc. columns we will use as partition keys.

In [None]:
%%synapse

from pyspark.sql.functions import col, lit
import pyspark.sql.functions as F

time_col = col('TimeGenerated')
year_col = F.year(time_col).alias('y')
month_col = F.month(time_col).alias('month')
day_col = F.dayofmonth(time_col).alias('d')
hour_col = F.hour(time_col).alias('h')
minute_col = F.minute(time_col)
five_min_bucket_col = (minute_col - (minute_col % 5)).alias('m')

df = df.select('*', year_col, month_col, day_col, hour_col, five_min_bucket_col)

partition_col_names = ['y', 'month', 'd', 'h', 'm']
df.select(partition_col_names).show(5)

In [None]:
# Optional - how many files will be written? (This could be over 100,000 files for a year's worth of data!)

# %%synapse
# df.select(partition_col_names).distinct().count()

Here we repartition the data on the partition keys created above, then write out one file per partition to the same location as the continuous log export. This will mean that continuously exported data and historical log data can be read from in a unified way and can also increase read performance for future use of the exported data.

> Fill in the details of the storage account, ADLS container to which to write the repartitioned data (usually you will want this to be the same container to which the continuous export tool is writing logs). 
> Also filling in the Sentinel workspace subscription, resource group and workspace name ensures that logs are written to the same path as continuously exported logs.

In [None]:
%%synapse

# Fill in your ADLS account and container and your Sentinel ws subscription, rg and ws name
account_name = '<storage account name>' # fill in your primary account name
container_name = '<container name>' # fill in your container name
subscription_id = '<subscription id>' # fill in the subscription id of your Sentinel workspace
resource_group = '<resource group>' # fill in your resource groups for your Sentinel workspace
workspace_name = '<Microsoft sentinel/log analytics workspace name>' # fill in your workspace name

continuous_export_base_path = (
    f"abfss://{container_name}@{account_name}.dfs.core.windows.net/WorkspaceResourceId=/"
    f"subscriptions/{subscription_id}/"
    f"resourcegroups/{resource_group.lower()}/"
    f"providers/microsoft.operationalinsights/"
    f"workspaces/{workspace_name.lower()}"
)

# The extra use of `repartition` here ensures that we only write one file per partition
df.repartition(*partition_col_names).write.partitionBy(partition_col_names).json(continuous_export_base_path, mode='append')

### Stop Spark Session

In [None]:
%synapse stop

# 4. Rename Some Directories

There is one remaining difference between the partition scheme used by the continuous export and the historical data we have exported - namely, the "month" column needs to be renamed to "m". We can make this change by using the ADLS Python SDK.  
That is, we need to change the directory structure from `{base_path}/y=<year>/`**`month=<month>`**`/d=<day>/h=<hour>/m=<5-minute-interval>`  to `{base_path}/y=<year>/`**`m=<month>`**`/d=<day>/h=<hour>/m=<5-minute-interval>`

> **Note:** This type of change can be done efficiently due to the nature of the ADLS gen2 hierarchical filesystem (see [details](https://docs.microsoft.com/azure/storage/blobs/data-lake-storage-namespace)). 
> Doing a similar task on a standard blob storage container would be much slower since the pseudo-filesystem means that each of the blobs within the high-level 'directory' would need to be individually renamed under the hood.

In [None]:
# Fill in the details of the storage account and ADLS container to which the repartitioned log data was written
account_name = '<storage account name>'  # fill in your  account name
container_name = '<container name>'  # fill in your container name

# In production, make sure any keys are stored and retrieved securely (e.g. using Azure Key Vault) - don't store keys as plaintext!
account_key = '<storage-account-key>'  # replace your storage account key

# Fill in the details of your **Sentinel workspace**
subscription_id = '<subscription id>' # fill in the subscription id of your Sentinel workspace
resource_group = '<resource group>' # fill in your resource groups for your Sentinel workspace
workspace_name = '<Microsoft sentinel/log analytics workspace name>' # fill in your workspace name

continuous_export_base_path = (
    f"abfss://{container_name}@{account_name}.dfs.core.windows.net/WorkspaceResourceId=/"
    f"subscriptions/{subscription_id}/"
    f"resourcegroups/{resource_group.lower()}/"
    f"providers/microsoft.operationalinsights/"
    f"workspaces/{workspace_name.lower()}"
)

rename_month_partition_dirs(
    adls_root_search_path=continuous_export_base_path,
    container_name=container_name,
    storage_account_name=account_name,
    storage_account_key=account_key,
)

# 5. Cleanup

Our exported historical log data is now ready. We can now remove any data from the intermediate staging location.

> **Note:** You may want to navigate to your Azure storage account and check that the data has been written correctly before deleting any data.

In [None]:
delete_directory(
    adls_dirname=staging_dirname,
    container_name=staging_container_name,
    storage_account_name=staging_account_name,
    storage_account_key=staging_account_key
)